use crate::{
    channel::ProtocolsError,
    message::{partial_eq_bincode, Message},
    participant::{A2bStreamOpen, S2bShutdownBparticipant},
    scheduler::{A2sConnect, Scheduler},
};
use bytes::Bytes;
use hashbrown::HashMap;
#[cfg(feature = "compression")]
use lz_fear::raw::DecodeError;
use network_protocol::{Bandwidth, InitProtocolError, Pid, Prio, Promises, Sid};
#[cfg(feature = "metrics")]
use prometheus::Registry;
use serde::{de::DeserializeOwned, Serialize};
use std::{
    net::SocketAddr,
    sync::{
        atomic::{AtomicBool, Ordering},
        Arc,
    },
    time::Duration,
};
use tokio::{
    io,
    runtime::Runtime,
    sync::{mpsc, oneshot, watch, Mutex},
};
use tracing::*;

type A2sDisconnect = Arc<Mutex<Option<mpsc::UnboundedSender<(Pid, S2bShutdownBparticipant)>>>>;

/// Represents a Tcp, Quic, Udp or Mpsc connection address
#[derive(Clone, Debug)]
pub enum ConnectAddr {
    Tcp(SocketAddr),
    Udp(SocketAddr),
    #[cfg(feature = "quic")]
    Quic(SocketAddr, quinn::ClientConfig, String),
    Mpsc(u64),
}

/// Represents a Tcp, Quic, Udp or Mpsc listen address
#[derive(Clone, Debug)]
pub enum ListenAddr {
    Tcp(SocketAddr),
    Udp(SocketAddr),
    #[cfg(feature = "quic")]
    Quic(SocketAddr, quinn::ServerConfig),
    Mpsc(u64),
}

/// `Participants` are generated by the [`Network`] and represent a connection
/// to a remote Participant. Look at the [`connect`] and [`connected`] method of
/// [`Networks`] on how to generate `Participants`
///
/// [`Networks`]: crate::api::Network
/// [`connect`]: Network::connect
/// [`connected`]: Network::connected
pub struct Participant {
    local_pid: Pid,
    remote_pid: Pid,
    a2b_open_stream_s: Mutex<mpsc::UnboundedSender<A2bStreamOpen>>,
    b2a_stream_opened_r: Mutex<mpsc::UnboundedReceiver<Stream>>,
    b2a_bandwidth_stats_r: watch::Receiver<f32>,
    a2s_disconnect_s: A2sDisconnect,
}

/// `Streams` represents a channel to send `n` messages with a certain priority
/// and [`Promises`]. messages need always to be send between 2 `Streams`.
///
/// `Streams` are generated by the [`Participant`].
/// Look at the [`open`] and [`opened`] method of [`Participant`] on how to
/// generate `Streams`
///
/// Unlike [`Network`] and [`Participant`], `Streams` don't implement interior
/// mutability, as multiple threads don't need access to the same `Stream`.
///
/// [`Networks`]: crate::api::Network
/// [`open`]: Participant::open
/// [`opened`]: Participant::opened
#[derive(Debug)]
pub struct Stream {
    local_pid: Pid,
    remote_pid: Pid,
    sid: Sid,
    #[allow(dead_code)]
    prio: Prio,
    promises: Promises,
    #[allow(dead_code)]
    guaranteed_bandwidth: Bandwidth,
    send_closed: Arc<AtomicBool>,
    a2b_msg_s: crossbeam_channel::Sender<(Sid, Bytes)>,
    b2a_msg_recv_r: Option<async_channel::Receiver<Bytes>>,
    a2b_close_stream_s: Option<mpsc::UnboundedSender<Sid>>,
}

/// Error type thrown by [`Networks`](Network) methods
#[derive(Debug)]
pub enum NetworkError {
    NetworkClosed,
    ListenFailed(std::io::Error),
    ConnectFailed(NetworkConnectError),
}

/// Error type thrown by [`Networks`](Network) connect
#[derive(Debug)]
pub enum NetworkConnectError {
    /// Either a Pid UUID clash or you are trying to hijack a connection
    InvalidSecret,
    Handshake(InitProtocolError<ProtocolsError>),
    Io(std::io::Error),
}

/// Error type thrown by [`Participants`](Participant) methods
#[derive(Debug, PartialEq, Clone)]
pub enum ParticipantError {
    ///Participant was closed by remote side
    ParticipantDisconnected,
    ///Underlying Protocol failed and wasn't able to recover, expect some Data
    /// loss unfortunately, there is no method to get the exact messages
    /// that failed. This is also returned when local side tries to do
    /// something while remote site gracefully disconnects
    ProtocolFailedUnrecoverable,
}

/// Error type thrown by [`Streams`](Stream) methods
/// A Compression Error should only happen if a client sends malicious code.
/// A Deserialize Error probably means you are expecting Type X while you
/// actually got send type Y.
#[derive(Debug)]
pub enum StreamError {
    StreamClosed,
    #[cfg(feature = "compression")]
    Compression(DecodeError),
    Deserialize(bincode::Error),
}

/// All Parameters of a Stream, can be used to generate RawMessages
#[derive(Debug, Clone)]
pub struct StreamParams {
    pub(crate) promises: Promises,
}

/// Use the `Network` to create connections to other [`Participants`]
///
/// The `Network` is the single source that handles all connections in your
/// Application. You can pass it around multiple threads in an
/// [`Arc`](std::sync::Arc) as all commands have internal mutability.
///
/// The `Network` has methods to [`connect`] to other [`Participants`] actively
/// via their [`ConnectAddr`], or [`listen`] passively for [`connected`]
/// [`Participants`] via [`ListenAddr`].
///
/// Too guarantee a clean shutdown, the [`Runtime`] MUST NOT be droped before
/// the Network.
///
/// # Examples
/// ```rust
/// use tokio::runtime::Runtime;
/// use veloren_network::{Network, ConnectAddr, ListenAddr, Pid};
///
/// # fn main() -> std::result::Result<(), Box<dyn std::error::Error>> {
/// // Create a Network, listen on port `2999` to accept connections and connect to port `8080` to connect to a (pseudo) database Application
/// let runtime = Runtime::new().unwrap();
/// let network = Network::new(Pid::new(), &runtime);
/// runtime.block_on(async{
///     # //setup pseudo database!
///     # let database = Network::new(Pid::new(), &runtime);
///     # database.listen(ListenAddr::Tcp("127.0.0.1:8080".parse().unwrap())).await?;
///     network.listen(ListenAddr::Tcp("127.0.0.1:2999".parse().unwrap())).await?;
///     let database = network.connect(ConnectAddr::Tcp("127.0.0.1:8080".parse().unwrap())).await?;
///     drop(network);
///     # drop(database);
///     # Ok(())
/// })
/// # }
/// ```
///
/// [`Participants`]: crate::api::Participant
/// [`Runtime`]: tokio::runtime::Runtime
/// [`connect`]: Network::connect
/// [`listen`]: Network::listen
/// [`connected`]: Network::connected
/// [`ConnectAddr`]: crate::api::ConnectAddr
/// [`ListenAddr`]: crate::api::ListenAddr
pub struct Network {
    local_pid: Pid,
    participant_disconnect_sender: Arc<Mutex<HashMap<Pid, A2sDisconnect>>>,
    listen_sender: Mutex<mpsc::UnboundedSender<(ListenAddr, oneshot::Sender<io::Result<()>>)>>,
    connect_sender: Mutex<mpsc::UnboundedSender<A2sConnect>>,
    connected_receiver: Mutex<mpsc::UnboundedReceiver<Participant>>,
    shutdown_network_s: Option<oneshot::Sender<oneshot::Sender<()>>>,
}

impl Network {
    /// Generates a new `Network` to handle all connections in an Application
    ///
    /// # Arguments
    /// * `participant_id` - provide it by calling [`Pid::new()`], usually you
    ///   don't want to reuse a Pid for 2 `Networks`
    /// * `runtime` - provide a [`Runtime`], it's used to internally spawn
    ///   tasks. It is necessary to clean up in the non-async `Drop`. **All**
    ///   network related components **must** be dropped before the runtime is
    ///   stopped. dropping the runtime while a shutdown is still in progress
    ///   leaves the network in a bad state which might cause a panic!
    ///
    /// # Result
    /// * `Self` - returns a `Network` which can be `Send` to multiple areas of
    ///   your code, including multiple threads. This is the base strct of this
    ///   crate.
    ///
    /// # Examples
    /// ```rust
    /// use tokio::runtime::Runtime;
    /// use veloren_network::{Network, Pid};
    ///
    /// let runtime = Runtime::new().unwrap();
    /// let network = Network::new(Pid::new(), &runtime);
    /// ```
    ///
    /// Usually you only create a single `Network` for an application,
    /// except when client and server are in the same application, then you
    /// will want 2. However there are no technical limitations from
    /// creating more.
    ///
    /// [`Pid::new()`]: network_protocol::Pid::new
    /// [`Runtime`]: tokio::runtime::Runtime
    pub fn new(participant_id: Pid, runtime: &Runtime) -> Self {
        Self::internal_new(
            participant_id,
            runtime,
            #[cfg(feature = "metrics")]
            None,
        )
    }

    /// See [`new`]
    ///
    /// # additional Arguments
    /// * `registry` - Provide a Registry in order to collect Prometheus metrics
    ///   by this `Network`, `None` will deactivate Tracing. Tracing is done via
    ///   [`prometheus`]
    ///
    /// # Examples
    /// ```rust
    /// use prometheus::Registry;
    /// use tokio::runtime::Runtime;
    /// use veloren_network::{Network, Pid};
    ///
    /// let runtime = Runtime::new().unwrap();
    /// let registry = Registry::new();
    /// let network = Network::new_with_registry(Pid::new(), &runtime, &registry);
    /// ```
    /// [`new`]: crate::api::Network::new
    #[cfg(feature = "metrics")]
    pub fn new_with_registry(participant_id: Pid, runtime: &Runtime, registry: &Registry) -> Self {
        Self::internal_new(participant_id, runtime, Some(registry))
    }

    fn internal_new(
        participant_id: Pid,
        runtime: &Runtime,
        #[cfg(feature = "metrics")] registry: Option<&Registry>,
    ) -> Self {
        let p = participant_id;
        let span = tracing::info_span!("network", ?p);
        span.in_scope(|| trace!("Starting Network"));
        let (scheduler, listen_sender, connect_sender, connected_receiver, shutdown_sender) =
            Scheduler::new(
                participant_id,
                #[cfg(feature = "metrics")]
                registry,
            );
        let participant_disconnect_sender = Arc::new(Mutex::new(HashMap::new()));
        let (shutdown_network_s, shutdown_network_r) = oneshot::channel();
        let f = Self::shutdown_mgr(
            p,
            shutdown_network_r,
            Arc::clone(&participant_disconnect_sender),
            shutdown_sender,
        );
        runtime.spawn(f);
        runtime.spawn(
            async move {
                trace!("Starting scheduler in own thread");
                scheduler.run().await;
                trace!("Stopping scheduler and his own thread");
            }
            .instrument(tracing::info_span!("network", ?p)),
        );
        Self {
            local_pid: participant_id,
            participant_disconnect_sender,
            listen_sender: Mutex::new(listen_sender),
            connect_sender: Mutex::new(connect_sender),
            connected_receiver: Mutex::new(connected_receiver),
            shutdown_network_s: Some(shutdown_network_s),
        }
    }

    /// starts listening on an [`ListenAddr`].
    /// When the method returns the `Network` is ready to listen for incoming
    /// connections OR has returned a [`NetworkError`] (e.g. port already used).
    /// You can call [`connected`] to asynchrony wait for a [`Participant`] to
    /// connect. You can call `listen` on multiple addresses, e.g. to
    /// support multiple Protocols or NICs.
    ///
    /// # Examples
    /// ```ignore
    /// use tokio::runtime::Runtime;
    /// use veloren_network::{Network, Pid, ListenAddr};
    ///
    /// # fn main() -> std::result::Result<(), Box<dyn std::error::Error>> {
    /// // Create a Network, listen on port `2000` TCP on all NICs and `2001` UDP locally
    /// let runtime = Runtime::new().unwrap();
    /// let network = Network::new(Pid::new(), &runtime);
    /// runtime.block_on(async {
    ///     network
    ///         .listen(ListenAddr::Tcp("127.0.0.1:2000".parse().unwrap()))
    ///         .await?;
    ///     network
    ///         .listen(ListenAddr::Udp("127.0.0.1:2001".parse().unwrap()))
    ///         .await?;
    ///     drop(network);
    ///     # Ok(())
    /// })
    /// # }
    /// ```
    ///
    /// [`connected`]: Network::connected
    /// [`ListenAddr`]: crate::api::ListenAddr
    #[instrument(name="network", skip(self, address), fields(p = %self.local_pid))]
    pub async fn listen(&self, address: ListenAddr) -> Result<(), NetworkError> {
        let (s2a_result_s, s2a_result_r) = oneshot::channel::<tokio::io::Result<()>>();
        debug!(?address, "listening on address");
        self.listen_sender
            .lock()
            .await
            .send((address, s2a_result_s))?;
        match s2a_result_r.await? {
            //waiting guarantees that we either listened successfully or get an error like port in
            // use
            Ok(()) => Ok(()),
            Err(e) => Err(NetworkError::ListenFailed(e)),
        }
    }

    /// starts connection to an [`ConnectAddr`].
    /// When the method returns the Network either returns a [`Participant`]
    /// ready to open [`Streams`] on OR has returned a [`NetworkError`] (e.g.
    /// can't connect, or invalid Handshake) # Examples
    /// ```ignore
    /// use tokio::runtime::Runtime;
    /// use veloren_network::{Network, Pid, ListenAddr, ConnectAddr};
    ///
    /// # fn main() -> std::result::Result<(), Box<dyn std::error::Error>> {
    /// // Create a Network, connect on port `2010` TCP and `2011` UDP like listening above
    /// let runtime = Runtime::new().unwrap();
    /// let network = Network::new(Pid::new(), &runtime);
    /// # let remote = Network::new(Pid::new(), &runtime);
    /// runtime.block_on(async {
    ///     # remote.listen(ListenAddr::Tcp("127.0.0.1:2010".parse().unwrap())).await?;
    ///     # remote.listen(ListenAddr::Udp("127.0.0.1:2011".parse().unwrap())).await?;
    ///     let p1 = network
    ///         .connect(ConnectAddr::Tcp("127.0.0.1:2010".parse().unwrap()))
    ///         .await?;
    ///     # //this doesn't work yet, so skip the test
    ///     # //TODO fixme!
    ///     # return Ok(());
    ///     let p2 = network
    ///         .connect(ConnectAddr::Udp("127.0.0.1:2011".parse().unwrap()))
    ///         .await?;
    ///     assert_eq!(&p1, &p2);
    ///     # Ok(())
    /// })?;
    /// drop(network);
    /// # drop(remote);
    /// # Ok(())
    /// # }
    /// ```
    /// Usually the `Network` guarantees that a operation on a [`Participant`]
    /// succeeds, e.g. by automatic retrying unless it fails completely e.g. by
    /// disconnecting from the remote. If 2 [`ConnectAddr] you
    /// `connect` to belongs to the same [`Participant`], you get the same
    /// [`Participant`] as a result. This is useful e.g. by connecting to
    /// the same [`Participant`] via multiple Protocols.
    ///
    /// [`Streams`]: crate::api::Stream
    /// [`ConnectAddr`]: crate::api::ConnectAddr
    #[instrument(name="network", skip(self, address), fields(p = %self.local_pid))]
    pub async fn connect(&self, address: ConnectAddr) -> Result<Participant, NetworkError> {
        let (pid_sender, pid_receiver) =
            oneshot::channel::<Result<Participant, NetworkConnectError>>();
        debug!(?address, "Connect to address");
        self.connect_sender
            .lock()
            .await
            .send((address, pid_sender))?;
        let participant = match pid_receiver.await? {
            Ok(p) => p,
            Err(e) => return Err(NetworkError::ConnectFailed(e)),
        };
        let remote_pid = participant.remote_pid;
        trace!(?remote_pid, "connected");
        self.participant_disconnect_sender
            .lock()
            .await
            .insert(remote_pid, Arc::clone(&participant.a2s_disconnect_s));
        Ok(participant)
    }

    /// returns a [`Participant`] created from a [`ListenAddr`] you
    /// called [`listen`] on before. This function will either return a
    /// working [`Participant`] ready to open [`Streams`] on OR has returned
    /// a [`NetworkError`] (e.g. Network got closed)
    ///
    /// # Examples
    /// ```rust
    /// use tokio::runtime::Runtime;
    /// use veloren_network::{ConnectAddr, ListenAddr, Network, Pid};
    ///
    /// # fn main() -> std::result::Result<(), Box<dyn std::error::Error>> {
    /// // Create a Network, listen on port `2020` TCP and opens returns their Pid
    /// let runtime = Runtime::new().unwrap();
    /// let network = Network::new(Pid::new(), &runtime);
    /// # let remote = Network::new(Pid::new(), &runtime);
    /// runtime.block_on(async {
    ///     network
    ///         .listen(ListenAddr::Tcp("127.0.0.1:2020".parse().unwrap()))
    ///         .await?;
    ///     # remote.connect(ConnectAddr::Tcp("127.0.0.1:2020".parse().unwrap())).await?;
    ///     while let Ok(participant) = network.connected().await {
    ///         println!("Participant connected: {}", participant.remote_pid());
    ///         # //skip test here as it would be a endless loop
    ///         # break;
    ///     }
    ///     drop(network);
    ///     # drop(remote);
    ///     # Ok(())
    /// })
    /// # }
    /// ```
    ///
    /// [`Streams`]: crate::api::Stream
    /// [`listen`]: crate::api::Network::listen
    /// [`ListenAddr`]: crate::api::ListenAddr
    #[instrument(name="network", skip(self), fields(p = %self.local_pid))]
    pub async fn connected(&self) -> Result<Participant, NetworkError> {
        let participant = self
            .connected_receiver
            .lock()
            .await
            .recv()
            .await
            .ok_or(NetworkError::NetworkClosed)?;
        self.participant_disconnect_sender.lock().await.insert(
            participant.remote_pid,
            Arc::clone(&participant.a2s_disconnect_s),
        );
        Ok(participant)
    }

    /// Use a mgr to handle shutdown smoothly and not in `Drop`
    #[instrument(name="network", skip(participant_disconnect_sender, shutdown_scheduler_s), fields(p = %local_pid))]
    async fn shutdown_mgr(
        local_pid: Pid,
        shutdown_network_r: oneshot::Receiver<oneshot::Sender<()>>,
        participant_disconnect_sender: Arc<Mutex<HashMap<Pid, A2sDisconnect>>>,
        shutdown_scheduler_s: oneshot::Sender<()>,
    ) {
        trace!("waiting for shutdown triggerNetwork");
        let return_s = shutdown_network_r.await;
        trace!("Shutting down Participants of Network");
        let mut finished_receiver_list = vec![];

        for (remote_pid, a2s_disconnect_s) in participant_disconnect_sender.lock().await.drain() {
            match a2s_disconnect_s.lock().await.take() {
                Some(a2s_disconnect_s) => {
                    trace!(?remote_pid, "Participants will be closed");
                    let (finished_sender, finished_receiver) = oneshot::channel();
                    finished_receiver_list.push((remote_pid, finished_receiver));
                    // If the channel was already dropped, we can assume that the other side
                    // already released its resources.
                    let _ = a2s_disconnect_s
                        .send((remote_pid, (Duration::from_secs(10), finished_sender)));
                },
                None => trace!(?remote_pid, "Participant already disconnected gracefully"),
            }
        }
        //wait after close is requested for all
        for (remote_pid, finished_receiver) in finished_receiver_list.drain(..) {
            match finished_receiver.await {
                Ok(Ok(())) => trace!(?remote_pid, "disconnect successful"),
                Ok(Err(e)) => info!(?remote_pid, ?e, "unclean disconnect"),
                Err(e) => warn!(
                    ?remote_pid,
                    ?e,
                    "Failed to get a message back from the scheduler, seems like the network is \
                     already closed"
                ),
            }
        }

        trace!("Participants have shut down - next: Scheduler");
        if let Err(()) = shutdown_scheduler_s.send(()) {
            error!("Scheduler is closed, but nobody other should be able to close it")
        };
        if let Ok(return_s) = return_s {
            if return_s.send(()).is_err() {
                warn!("Network::drop stoped after a timeout and didn't wait for our shutdown");
            };
        }
        debug!("Network has shut down");
    }
}

impl Participant {
    pub(crate) fn new(
        local_pid: Pid,
        remote_pid: Pid,
        a2b_open_stream_s: mpsc::UnboundedSender<A2bStreamOpen>,
        b2a_stream_opened_r: mpsc::UnboundedReceiver<Stream>,
        b2a_bandwidth_stats_r: watch::Receiver<f32>,
        a2s_disconnect_s: mpsc::UnboundedSender<(Pid, S2bShutdownBparticipant)>,
    ) -> Self {
        Self {
            local_pid,
            remote_pid,
            a2b_open_stream_s: Mutex::new(a2b_open_stream_s),
            b2a_stream_opened_r: Mutex::new(b2a_stream_opened_r),
            b2a_bandwidth_stats_r,
            a2s_disconnect_s: Arc::new(Mutex::new(Some(a2s_disconnect_s))),
        }
    }

    /// Opens a [`Stream`] on this `Participant` with a certain Priority and
    /// [`Promises`]
    ///
    /// # Arguments
    /// * `prio` - defines which stream is processed first when limited on
    ///   bandwidth. See [`Prio`] for documentation.
    /// * `promises` - use a combination of you prefered [`Promises`], see the
    ///   link for further documentation. You can combine them, e.g.
    ///   `Promises::ORDERED | Promises::CONSISTENCY` The Stream will then
    ///   guarantee that those promises are met.
    /// * `bandwidth` - sets a guaranteed bandwidth which is reserved for this
    ///   stream. When excess bandwidth is available it will be used. See
    ///   [`Bandwidth`] for details.
    ///
    /// A [`ParticipantError`] might be thrown if the `Participant` is already
    /// closed. [`Streams`] can be created without a answer from the remote
    /// side, resulting in very fast creation and closing latency.
    ///
    /// # Examples
    /// ```rust
    /// use tokio::runtime::Runtime;
    /// use veloren_network::{ConnectAddr, ListenAddr, Network, Pid, Promises};
    ///
    /// # fn main() -> std::result::Result<(), Box<dyn std::error::Error>> {
    /// // Create a Network, connect on port 2100 and open a stream
    /// let runtime = Runtime::new().unwrap();
    /// let network = Network::new(Pid::new(), &runtime);
    /// # let remote = Network::new(Pid::new(), &runtime);
    /// runtime.block_on(async {
    ///     # remote.listen(ListenAddr::Tcp("127.0.0.1:2100".parse().unwrap())).await?;
    ///     let p1 = network
    ///         .connect(ConnectAddr::Tcp("127.0.0.1:2100".parse().unwrap()))
    ///         .await?;
    ///     let _s1 = p1
    ///         .open(4, Promises::ORDERED | Promises::CONSISTENCY, 1000)
    ///         .await?;
    ///     drop(network);
    ///     # drop(remote);
    ///     # Ok(())
    /// })
    /// # }
    /// ```
    ///
    /// [`Prio`]: network_protocol::Prio
    /// [`Bandwidth`]: network_protocol::Bandwidth
    /// [`Promises`]: network_protocol::Promises
    /// [`Streams`]: crate::api::Stream
    #[instrument(name="network", skip(self, prio, promises, bandwidth), fields(p = %self.local_pid))]
    pub async fn open(
        &self,
        prio: u8,
        promises: Promises,
        bandwidth: Bandwidth,
    ) -> Result<Stream, ParticipantError> {
        debug_assert!(prio <= network_protocol::HIGHEST_PRIO, "invalid prio");
        let (p2a_return_stream_s, p2a_return_stream_r) = oneshot::channel::<Stream>();
        if let Err(e) = self.a2b_open_stream_s.lock().await.send((
            prio,
            promises,
            bandwidth,
            p2a_return_stream_s,
        )) {
            debug!(?e, "bParticipant is already closed, notifying");
            return Err(ParticipantError::ParticipantDisconnected);
        }
        match p2a_return_stream_r.await {
            Ok(stream) => {
                let sid = stream.sid;
                trace!(?sid, "opened stream");
                Ok(stream)
            },
            Err(_) => {
                debug!("p2a_return_stream_r failed, closing participant");
                Err(ParticipantError::ParticipantDisconnected)
            },
        }
    }

    /// Use this method to handle [`Streams`] opened from remote site, like the
    /// [`connected`] method of [`Network`]. This is the associated method
    /// to [`open`]. It's guaranteed that the order of [`open`] and `opened`
    /// is equal. The `nth` [`Streams`] on one side will represent the `nth` on
    /// the other side. A [`ParticipantError`] might be thrown if the
    /// `Participant` is already closed.
    ///
    /// # Examples
    /// ```rust
    /// use tokio::runtime::Runtime;
    /// use veloren_network::{Network, Pid, ListenAddr, ConnectAddr, Promises};
    ///
    /// # fn main() -> std::result::Result<(), Box<dyn std::error::Error>> {
    /// // Create a Network, connect on port 2110 and wait for the other side to open a stream
    /// // Note: It's quite unusual to actively connect, but then wait on a stream to be connected, usually the Application taking initiative want's to also create the first Stream.
    /// let runtime = Runtime::new().unwrap();
    /// let network = Network::new(Pid::new(), &runtime);
    /// # let remote = Network::new(Pid::new(), &runtime);
    /// runtime.block_on(async {
    ///     # remote.listen(ListenAddr::Tcp("127.0.0.1:2110".parse().unwrap())).await?;
    ///     let p1 = network.connect(ConnectAddr::Tcp("127.0.0.1:2110".parse().unwrap())).await?;
    ///     # let p2 = remote.connected().await?;
    ///     # p2.open(4, Promises::ORDERED | Promises::CONSISTENCY, 0).await?;
    ///     let _s1 = p1.opened().await?;
    ///     drop(network);
    ///     # drop(remote);
    ///     # Ok(())
    /// })
    /// # }
    /// ```
    ///
    /// [`Streams`]: crate::api::Stream
    /// [`connected`]: Network::connected
    /// [`open`]: Participant::open
    #[instrument(name="network", skip(self), fields(p = %self.local_pid))]
    pub async fn opened(&self) -> Result<Stream, ParticipantError> {
        match self.b2a_stream_opened_r.lock().await.recv().await {
            Some(stream) => {
                let sid = stream.sid;
                debug!(?sid, "Receive opened stream");
                Ok(stream)
            },
            None => {
                debug!("stream_opened_receiver failed, closing participant");
                Err(ParticipantError::ParticipantDisconnected)
            },
        }
    }

    /// disconnecting a `Participant` in a async way.
    /// Use this rather than `Participant::Drop` if you want to close multiple
    /// `Participants`.
    ///
    /// This function will wait for all [`Streams`] to properly close, including
    /// all messages to be send before closing. If an error occurs with one
    /// of the messages.
    /// Except if the remote side already dropped the `Participant`
    /// simultaneously, then messages won't be send
    ///
    /// There is NO `disconnected` function in `Participant`, if a `Participant`
    /// is no longer reachable (e.g. as the network cable was unplugged) the
    /// `Participant` will fail all action, but needs to be manually
    /// disconnected, using this function.
    ///
    /// # Examples
    /// ```rust
    /// use tokio::runtime::Runtime;
    /// use veloren_network::{Network, Pid, ListenAddr, ConnectAddr};
    ///
    /// # fn main() -> std::result::Result<(), Box<dyn std::error::Error>> {
    /// // Create a Network, listen on port `2030` TCP and opens returns their Pid and close connection.
    /// let runtime = Runtime::new().unwrap();
    /// let network = Network::new(Pid::new(), &runtime);
    /// # let remote = Network::new(Pid::new(), &runtime);
    /// let err = runtime.block_on(async {
    ///     network
    ///         .listen(ListenAddr::Tcp("127.0.0.1:2030".parse().unwrap()))
    ///         .await?;
    ///     # let keep_alive = remote.connect(ConnectAddr::Tcp("127.0.0.1:2030".parse().unwrap())).await?;
    ///     while let Ok(participant) = network.connected().await {
    ///         println!("Participant connected: {}", participant.remote_pid());
    ///         participant.disconnect().await?;
    ///         # //skip test here as it would be a endless loop
    ///         # break;
    ///     }
    ///     # Ok(())
    /// });
    /// drop(network);
    /// # drop(remote);
    /// # err
    /// # }
    /// ```
    ///
    /// [`Streams`]: crate::api::Stream
    #[instrument(name="network", skip(self), fields(p = %self.local_pid))]
    pub async fn disconnect(self) -> Result<(), ParticipantError> {
        // Remove, Close and try_unwrap error when unwrap fails!
        debug!("Closing participant from network");

        //Streams will be closed by BParticipant
        match self.a2s_disconnect_s.lock().await.take() {
            Some(a2s_disconnect_s) => {
                let (finished_sender, finished_receiver) = oneshot::channel();
                // Participant is connecting to Scheduler here, not as usual
                // Participant<->BParticipant

                // If this is already dropped, we can assume the other side already freed its
                // resources.
                let _ = a2s_disconnect_s
                    .send((self.remote_pid, (Duration::from_secs(120), finished_sender)));
                match finished_receiver.await {
                    Ok(res) => {
                        match res {
                            Ok(()) => trace!("Participant is now closed"),
                            Err(ref e) => {
                                trace!(?e, "Error occurred during shutdown of participant")
                            },
                        };
                        res
                    },
                    Err(e) => {
                        //this is a bug. but as i am Participant i can't destroy the network
                        error!(
                            ?e,
                            "Failed to get a message back from the scheduler, seems like the \
                             network is already closed"
                        );
                        Err(ParticipantError::ProtocolFailedUnrecoverable)
                    },
                }
            },
            None => {
                warn!(
                    "seems like you are trying to disconnecting a participant after the network \
                     was already dropped. It was already dropped with the network!"
                );
                Err(ParticipantError::ParticipantDisconnected)
            },
        }
    }

    /// Returns the current approximation on the maximum bandwidth available.
    /// This WILL fluctuate based on the amount/size of send messages.
    pub fn bandwidth(&self) -> f32 { *self.b2a_bandwidth_stats_r.borrow() }

    /// Returns the remote [`Pid`](network_protocol::Pid)
    pub fn remote_pid(&self) -> Pid { self.remote_pid }
}

impl Stream {
    pub(crate) fn new(
        local_pid: Pid,
        remote_pid: Pid,
        sid: Sid,
        prio: Prio,
        promises: Promises,
        guaranteed_bandwidth: Bandwidth,
        send_closed: Arc<AtomicBool>,
        a2b_msg_s: crossbeam_channel::Sender<(Sid, Bytes)>,
        b2a_msg_recv_r: async_channel::Receiver<Bytes>,
        a2b_close_stream_s: mpsc::UnboundedSender<Sid>,
    ) -> Self {
        Self {
            local_pid,
            remote_pid,
            sid,
            prio,
            promises,
            guaranteed_bandwidth,
            send_closed,
            a2b_msg_s,
            b2a_msg_recv_r: Some(b2a_msg_recv_r),
            a2b_close_stream_s: Some(a2b_close_stream_s),
        }
    }

    /// use to send a arbitrary message to the remote side, by having the remote
    /// side also opened a `Stream` linked to this. the message will be
    /// [`Serialized`], which actually is quite slow compared to most other
    /// calculations done. A faster method [`send_raw`] exists, when extra
    /// speed is needed. The other side needs to use the respective [`recv`]
    /// function and know the type send.
    ///
    /// `send` is an exception to the `async` messages, as it's probably called
    /// quite often so it doesn't wait for execution. Which also means, that
    /// no feedback is provided. It's to assume that the Message got `send`
    /// correctly. If a error occurred, the next call will return an Error.
    /// If the [`Participant`] disconnected it will also be unable to be used
    /// any more. A [`StreamError`] will be returned in the error case, e.g.
    /// when the `Stream` got closed already.
    ///
    /// Note when a `Stream` is dropped locally, it will still send all
    /// messages, though the `drop` will return immediately, however, when a
    /// [`Participant`] gets gracefully shut down, all remaining messages
    /// will be send. If the `Stream` is dropped from remote side no further
    /// messages are send, because the remote side has no way of listening
    /// to them either way. If the last channel is destroyed (e.g. by losing
    /// the internet connection or non-graceful shutdown, pending messages
    /// are also dropped.
    ///
    /// # Example
    /// ```
    /// # use veloren_network::Promises;
    /// use tokio::runtime::Runtime;
    /// use veloren_network::{Network, ListenAddr, ConnectAddr, Pid};
    ///
    /// # fn main() -> std::result::Result<(), Box<dyn std::error::Error>> {
    /// // Create a Network, listen on Port `2200` and wait for a Stream to be opened, then answer `Hello World`
    /// let runtime = Runtime::new().unwrap();
    /// let network = Network::new(Pid::new(), &runtime);
    /// # let remote = Network::new(Pid::new(), &runtime);
    /// runtime.block_on(async {
    ///     network.listen(ListenAddr::Tcp("127.0.0.1:2200".parse().unwrap())).await?;
    ///     # let remote_p = remote.connect(ConnectAddr::Tcp("127.0.0.1:2200".parse().unwrap())).await?;
    ///     # // keep it alive
    ///     # let _stream_p = remote_p.open(4, Promises::ORDERED | Promises::CONSISTENCY, 0).await?;
    ///     let participant_a = network.connected().await?;
    ///     let mut stream_a = participant_a.opened().await?;
    ///     //Send  Message
    ///     stream_a.send("Hello World")?;
    ///     drop(network);
    ///     # drop(remote);
    ///     # Ok(())
    /// })
    /// # }
    /// ```
    ///
    /// [`send_raw`]: Stream::send_raw
    /// [`recv`]: Stream::recv
    /// [`Serialized`]: Serialize
    #[inline]
    pub fn send<M: Serialize>(&mut self, msg: M) -> Result<(), StreamError> {
        self.send_raw_move(Message::serialize(&msg, self.params()))
    }

    /// This methods give the option to skip multiple calls of [`bincode`] and
    /// [`compress`], e.g. in case the same Message needs to send on
    /// multiple `Streams` to multiple [`Participants`]. Other then that,
    /// the same rules apply than for [`send`].
    /// You need to create a Message via [`Message::serialize`].
    ///
    /// # Example
    /// ```rust
    /// # use veloren_network::Promises;
    /// use tokio::runtime::Runtime;
    /// use bincode;
    /// use veloren_network::{Network, ListenAddr, ConnectAddr, Pid, Message};
    ///
    /// # fn main() -> std::result::Result<(), Box<dyn std::error::Error>> {
    /// let runtime = Runtime::new().unwrap();
    /// let network = Network::new(Pid::new(), &runtime);
    /// # let remote1 = Network::new(Pid::new(), &runtime);
    /// # let remote2 = Network::new(Pid::new(), &runtime);
    /// runtime.block_on(async {
    ///     network.listen(ListenAddr::Tcp("127.0.0.1:2210".parse().unwrap())).await?;
    ///     # let remote1_p = remote1.connect(ConnectAddr::Tcp("127.0.0.1:2210".parse().unwrap())).await?;
    ///     # let remote2_p = remote2.connect(ConnectAddr::Tcp("127.0.0.1:2210".parse().unwrap())).await?;
    ///     # assert_eq!(remote1_p.remote_pid(), remote2_p.remote_pid());
    ///     # remote1_p.open(4, Promises::ORDERED | Promises::CONSISTENCY, 0).await?;
    ///     # remote2_p.open(4, Promises::ORDERED | Promises::CONSISTENCY, 0).await?;
    ///     let participant_a = network.connected().await?;
    ///     let participant_b = network.connected().await?;
    ///     let mut stream_a = participant_a.opened().await?;
    ///     let mut stream_b = participant_b.opened().await?;
    ///
    ///     //Prepare Message and decode it
    ///     let msg = Message::serialize("Hello World", stream_a.params());
    ///     //Send same Message to multiple Streams
    ///     stream_a.send_raw(&msg);
    ///     stream_b.send_raw(&msg);
    ///     drop(network);
    ///     # drop(remote1);
    ///     # drop(remote2);
    ///     # Ok(())
    /// })
    /// # }
    /// ```
    ///
    /// [`send`]: Stream::send
    /// [`Participants`]: crate::api::Participant
    /// [`compress`]: lz_fear::raw::compress2
    /// [`Message::serialize`]: crate::message::Message::serialize
    #[inline]
    pub fn send_raw(&mut self, message: &Message) -> Result<(), StreamError> {
        self.send_raw_move(Message {
            data: message.data.clone(),
            #[cfg(feature = "compression")]
            compressed: message.compressed,
        })
    }

    fn send_raw_move(&mut self, message: Message) -> Result<(), StreamError> {
        if self.send_closed.load(Ordering::Relaxed) {
            return Err(StreamError::StreamClosed);
        }
        #[cfg(debug_assertions)]
        message.verify(self.params());
        self.a2b_msg_s.send((self.sid, message.data))?;
        Ok(())
    }

    /// use `recv` to wait on a Message send from the remote side by their
    /// `Stream`. The Message needs to implement [`DeserializeOwned`] and
    /// thus, the resulting type must already be known by the receiving side.
    /// If this is not know from the Application logic, one could use a `Enum`
    /// and then handle the received message via a `match` state.
    ///
    /// A [`StreamError`] will be returned in the error case, e.g. when the
    /// `Stream` got closed already.
    ///
    /// # Example
    /// ```
    /// # use veloren_network::Promises;
    /// use tokio::runtime::Runtime;
    /// use veloren_network::{Network, ListenAddr, ConnectAddr, Pid};
    ///
    /// # fn main() -> std::result::Result<(), Box<dyn std::error::Error>> {
    /// // Create a Network, listen on Port `2220` and wait for a Stream to be opened, then listen on it
    /// let runtime = Runtime::new().unwrap();
    /// let network = Network::new(Pid::new(), &runtime);
    /// # let remote = Network::new(Pid::new(), &runtime);
    /// runtime.block_on(async {
    ///     network.listen(ListenAddr::Tcp("127.0.0.1:2220".parse().unwrap())).await?;
    ///     # let remote_p = remote.connect(ConnectAddr::Tcp("127.0.0.1:2220".parse().unwrap())).await?;
    ///     # let mut stream_p = remote_p.open(4, Promises::ORDERED | Promises::CONSISTENCY, 0).await?;
    ///     # stream_p.send("Hello World");
    ///     let participant_a = network.connected().await?;
    ///     let mut stream_a = participant_a.opened().await?;
    ///     //Recv  Message
    ///     println!("{}", stream_a.recv::<String>().await?);
    ///     drop(network);
    ///     # drop(remote);
    ///     # Ok(())
    /// })
    /// # }
    /// ```
    #[inline]
    pub async fn recv<M: DeserializeOwned>(&mut self) -> Result<M, StreamError> {
        self.recv_raw().await?.deserialize()
    }

    /// the equivalent like [`send_raw`] but for [`recv`], no [`bincode`] or
    /// [`decompress`] is executed for performance reasons.
    ///
    /// # Example
    /// ```
    /// # use veloren_network::Promises;
    /// use tokio::runtime::Runtime;
    /// use veloren_network::{Network, ListenAddr, ConnectAddr, Pid};
    ///
    /// # fn main() -> std::result::Result<(), Box<dyn std::error::Error>> {
    /// // Create a Network, listen on Port `2230` and wait for a Stream to be opened, then listen on it
    /// let runtime = Runtime::new().unwrap();
    /// let network = Network::new(Pid::new(), &runtime);
    /// # let remote = Network::new(Pid::new(), &runtime);
    /// runtime.block_on(async {
    ///     network.listen(ListenAddr::Tcp("127.0.0.1:2230".parse().unwrap())).await?;
    ///     # let remote_p = remote.connect(ConnectAddr::Tcp("127.0.0.1:2230".parse().unwrap())).await?;
    ///     # let mut stream_p = remote_p.open(4, Promises::ORDERED | Promises::CONSISTENCY, 0).await?;
    ///     # stream_p.send("Hello World");
    ///     let participant_a = network.connected().await?;
    ///     let mut stream_a = participant_a.opened().await?;
    ///     //Recv  Message
    ///     let msg = stream_a.recv_raw().await?;
    ///     //Resend Message, without deserializing
    ///     stream_a.send_raw(&msg)?;
    ///     drop(network);
    ///     # drop(remote);
    ///     # Ok(())
    /// })
    /// # }
    /// ```
    ///
    /// [`send_raw`]: Stream::send_raw
    /// [`recv`]: Stream::recv
    /// [`decompress`]: lz_fear::raw::decompress_raw
    pub async fn recv_raw(&mut self) -> Result<Message, StreamError> {
        match &mut self.b2a_msg_recv_r {
            Some(b2a_msg_recv_r) => {
                match b2a_msg_recv_r.recv().await {
                    Ok(data) => Ok(Message {
                        data,
                        #[cfg(feature = "compression")]
                        compressed: self.promises.contains(Promises::COMPRESSED),
                    }),
                    Err(_) => {
                        self.b2a_msg_recv_r = None; //prevent panic
                        Err(StreamError::StreamClosed)
                    },
                }
            },
            None => Err(StreamError::StreamClosed),
        }
    }

    /// use `try_recv` to check for a Message send from the remote side by their
    /// `Stream`. This function does not block and returns immediately. It's
    /// intended for use in non-async context only. Other then that, the
    /// same rules apply than for [`recv`].
    ///
    /// # Example
    /// ```
    /// # use veloren_network::Promises;
    /// use tokio::runtime::Runtime;
    /// use veloren_network::{Network, ListenAddr, ConnectAddr, Pid};
    ///
    /// # fn main() -> std::result::Result<(), Box<dyn std::error::Error>> {
    /// // Create a Network, listen on Port `2240` and wait for a Stream to be opened, then listen on it
    /// let runtime = Runtime::new().unwrap();
    /// let network = Network::new(Pid::new(), &runtime);
    /// # let remote = Network::new(Pid::new(), &runtime);
    /// runtime.block_on(async {
    ///     network.listen(ListenAddr::Tcp("127.0.0.1:2240".parse().unwrap())).await?;
    ///     # let remote_p = remote.connect(ConnectAddr::Tcp("127.0.0.1:2240".parse().unwrap())).await?;
    ///     # let mut stream_p = remote_p.open(4, Promises::ORDERED | Promises::CONSISTENCY, 0).await?;
    ///     # stream_p.send("Hello World");
    ///     # std::thread::sleep(std::time::Duration::from_secs(1));
    ///     let participant_a = network.connected().await?;
    ///     let mut stream_a = participant_a.opened().await?;
    ///     //Try Recv  Message
    ///     println!("{:?}", stream_a.try_recv::<String>()?);
    ///     drop(network);
    ///     # drop(remote);
    ///     # Ok(())
    /// })
    /// # }
    /// ```
    ///
    /// [`recv`]: Stream::recv
    #[inline]
    pub fn try_recv<M: DeserializeOwned>(&mut self) -> Result<Option<M>, StreamError> {
        match &mut self.b2a_msg_recv_r {
            Some(b2a_msg_recv_r) => match b2a_msg_recv_r.try_recv() {
                Ok(data) => Ok(Some(
                    Message {
                        data,
                        #[cfg(feature = "compression")]
                        compressed: self.promises.contains(Promises::COMPRESSED),
                    }
                    .deserialize()?,
                )),
                Err(async_channel::TryRecvError::Empty) => Ok(None),
                Err(async_channel::TryRecvError::Closed) => {
                    self.b2a_msg_recv_r = None; //prevent panic
                    Err(StreamError::StreamClosed)
                },
            },
            None => Err(StreamError::StreamClosed),
        }
    }

    pub fn params(&self) -> StreamParams {
        StreamParams {
            promises: self.promises,
        }
    }
}

impl core::cmp::PartialEq for Participant {
    fn eq(&self, other: &Self) -> bool {
        //don't check local_pid, 2 Participant from different network should match if
        // they are the "same"
        self.remote_pid == other.remote_pid
    }
}

fn actively_wait<T, F>(name: &'static str, mut finished_receiver: oneshot::Receiver<T>, f: F)
where
    F: FnOnce(T) + Send + 'static,
    T: Send + 'static,
{
    const CHANNEL_ERR: &str = "Something is wrong in internal scheduler/participant coding";

    if let Ok(handle) = tokio::runtime::Handle::try_current() {
        // When in Async Context WE MUST NOT SYNC BLOCK (as a deadlock might occur as
        // other is queued behind). And we CANNOT join our Future_Handle
        trace!("async context detected, defer shutdown");
        handle.spawn(async move {
            match finished_receiver.await {
                Ok(data) => f(data),
                Err(e) => error!("{}{}: {}", name, CHANNEL_ERR, e),
            }
        });
    } else {
        let mut cnt = 0;
        loop {
            use tokio::sync::oneshot::error::TryRecvError;
            match finished_receiver.try_recv() {
                Ok(data) => {
                    f(data);
                    break;
                },
                Err(TryRecvError::Closed) => panic!("{}{}", name, CHANNEL_ERR),
                Err(TryRecvError::Empty) => {
                    trace!("activly sleeping");
                    cnt += 1;
                    if cnt > 10 {
                        error!("Timeout waiting for shutdown, dropping");
                        break;
                    }
                    std::thread::sleep(Duration::from_millis(100) * cnt);
                },
            }
        }
    };
}

impl Drop for Network {
    #[instrument(name="network", skip(self), fields(p = %self.local_pid))]
    fn drop(&mut self) {
        trace!("Dropping Network");
        let (finished_sender, finished_receiver) = oneshot::channel();
        match self
            .shutdown_network_s
            .take()
            .unwrap()
            .send(finished_sender)
        {
            Err(e) => warn!(?e, "Runtime seems to be dropped already"),
            Ok(()) => actively_wait("network", finished_receiver, |()| {
                info!("Network dropped gracefully")
            }),
        };
    }
}

impl Drop for Participant {
    #[instrument(name="remote", skip(self), fields(p = %self.remote_pid))]
    #[instrument(name="network", skip(self), fields(p = %self.local_pid))]
    fn drop(&mut self) {
        const SHUTDOWN_ERR: &str = "Error while dropping the participant, couldn't send all \
                                    outgoing messages, dropping remaining";
        const SCHEDULER_ERR: &str =
            "Something is wrong in internal scheduler coding or you dropped the runtime to early";
        // ignore closed, as we need to send it even though we disconnected the
        // participant from network
        debug!("Shutting down Participant");

        match self.a2s_disconnect_s.try_lock() {
            Err(e) => debug!(?e, "Participant is beeing dropped by Network right now"),
            Ok(mut s) => match s.take() {
                None => info!("Participant already has been shutdown gracefully"),
                Some(a2s_disconnect_s) => {
                    debug!("Disconnect from Scheduler");
                    let (finished_sender, finished_receiver) = oneshot::channel();
                    match a2s_disconnect_s
                        .send((self.remote_pid, (Duration::from_secs(10), finished_sender)))
                    {
                        Err(e) => warn!(?e, SCHEDULER_ERR),
                        Ok(()) => {
                            actively_wait("participant", finished_receiver, |d| match d {
                                Ok(()) => info!("Participant dropped gracefully"),
                                Err(e) => error!(?e, SHUTDOWN_ERR),
                            });
                        },
                    }
                },
            },
        }
    }
}

impl Drop for Stream {
    #[instrument(name="remote", skip(self), fields(p = %self.remote_pid))]
    #[instrument(name="network", skip(self), fields(p = %self.local_pid))]

    fn drop(&mut self) {
        // send if closed is unnecessary but doesn't hurt, we must not crash
        let sid = self.sid;
        if !self.send_closed.load(Ordering::Relaxed) {
            debug!(?sid, "Shutting down Stream");
            if let Err(e) = self.a2b_close_stream_s.take().unwrap().send(self.sid) {
                debug!(
                    ?e,
                    "bparticipant part of a gracefully shutdown was already closed"
                );
            }
        } else {
            trace!(?sid, "Stream Drop not needed");
        }
    }
}

impl std::fmt::Debug for Participant {
    #[inline]
    fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
        write!(
            f,
            "Participant {{ local_pid: {:?}, remote_pid: {:?} }}",
            &self.local_pid, &self.remote_pid,
        )
    }
}

impl<T> From<crossbeam_channel::SendError<T>> for StreamError {
    fn from(_err: crossbeam_channel::SendError<T>) -> Self { StreamError::StreamClosed }
}

impl<T> From<crossbeam_channel::SendError<T>> for NetworkError {
    fn from(_err: crossbeam_channel::SendError<T>) -> Self { NetworkError::NetworkClosed }
}

impl<T> From<mpsc::error::SendError<T>> for NetworkError {
    fn from(_err: mpsc::error::SendError<T>) -> Self { NetworkError::NetworkClosed }
}

impl From<oneshot::error::RecvError> for NetworkError {
    fn from(_err: oneshot::error::RecvError) -> Self { NetworkError::NetworkClosed }
}

impl From<std::io::Error> for NetworkError {
    fn from(_err: std::io::Error) -> Self { NetworkError::NetworkClosed }
}

impl From<Box<bincode::ErrorKind>> for StreamError {
    fn from(err: Box<bincode::ErrorKind>) -> Self { StreamError::Deserialize(err) }
}

impl core::fmt::Display for StreamError {
    fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
        match self {
            StreamError::StreamClosed => write!(f, "stream closed"),
            #[cfg(feature = "compression")]
            StreamError::Compression(err) => write!(f, "compression error on message: {}", err),
            StreamError::Deserialize(err) => write!(f, "deserialize error on message: {}", err),
        }
    }
}

impl core::fmt::Display for ParticipantError {
    fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
        match self {
            ParticipantError::ParticipantDisconnected => write!(f, "Participant disconnect"),
            ParticipantError::ProtocolFailedUnrecoverable => {
                write!(f, "underlying protocol failed unrecoverable")
            },
        }
    }
}

impl core::fmt::Display for NetworkError {
    fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
        match self {
            NetworkError::NetworkClosed => write!(f, "Network closed"),
            NetworkError::ListenFailed(_) => write!(f, "Listening failed"),
            NetworkError::ConnectFailed(_) => write!(f, "Connecting failed"),
        }
    }
}

impl core::fmt::Display for NetworkConnectError {
    fn fmt(&self, f: &mut core::fmt::Formatter<'_>) -> core::fmt::Result {
        match self {
            NetworkConnectError::Io(e) => write!(f, "Io error: {}", e),
            NetworkConnectError::Handshake(e) => write!(f, "Handshake error: {}", e),
            NetworkConnectError::InvalidSecret => {
                write!(f, "You specified the wrong secret on your second channel")
            },
        }
    }
}

/// implementing PartialEq as it's super convenient in tests
impl core::cmp::PartialEq for StreamError {
    fn eq(&self, other: &Self) -> bool {
        match self {
            StreamError::StreamClosed => match other {
                StreamError::StreamClosed => true,
                #[cfg(feature = "compression")]
                StreamError::Compression(_) => false,
                StreamError::Deserialize(_) => false,
            },
            #[cfg(feature = "compression")]
            StreamError::Compression(err) => match other {
                StreamError::StreamClosed => false,
                #[cfg(feature = "compression")]
                StreamError::Compression(other_err) => err == other_err,
                StreamError::Deserialize(_) => false,
            },
            StreamError::Deserialize(err) => match other {
                StreamError::StreamClosed => false,
                #[cfg(feature = "compression")]
                StreamError::Compression(_) => false,
                StreamError::Deserialize(other_err) => partial_eq_bincode(err, other_err),
            },
        }
    }
}

impl std::error::Error for StreamError {}
impl std::error::Error for ParticipantError {}
impl std::error::Error for NetworkError {}
impl std::error::Error for NetworkConnectError {}
